前言
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,广泛应用于实时通信场景——聊天室、在线协作文档、实时通知、游戏等。
NestJS 提供了 Gateway 抽象,将 WebSocket 的底层复杂性封装为类似于 Controller 的声明式 API,使开发者能以熟悉的方式构建实时应用。
本文将以搭建一个实时聊天室为主线,全面介绍 NestJS WebSocket Gateway 的完整 API 和最佳实践。
本文基于 Socket.IO 适配器。NestJS 的 WebSocket 模块是平台无关的,也支持 ws 库。
安装
pnpm add @nestjs/websockets @nestjs/platform-socket.io
Gateway 完整 API 清单
1. @WebSocketGateway() - 类装饰器
标记一个类为 WebSocket Gateway。NestJS 会自动初始化该 Gateway 并管理其生命周期。
@WebSocketGateway()
export class ChatGateway {}
配置选项:
@WebSocketGateway({
namespace: 'chat', // 命名空间隔离
path: '/ws', // WebSocket 连接路径
cors: { origin: '*' }, // CORS 配置
transports: ['websocket'], // 传输方式
pingTimeout: 5000, // 心跳超时
pingInterval: 10000, // 心跳间隔
})
export class ChatGateway {}
| 选项 | 说明 |
|---|---|
namespace | 命名空间,用于业务隔离 |
path | WebSocket 连接路径 |
cors | CORS 配置 |
transports | 传输方式(websocket / polling) |
pingTimeout / pingInterval | 心跳超时和间隔 |
使用场景: 创建实时通信服务入口。可按业务模块划分命名空间,如 chat、notifications、live。
2. @WebSocketServer() - 属性装饰器
注入底层 WebSocket Server 实例(Socket.IO 的 Server 或 Namespace 类型)。NestJS 在 Gateway 初始化后自动赋值。
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';
@WebSocketGateway()
export class ChatGateway {
@WebSocketServer()
server: Server;
// 向所有客户端广播
broadcast(message: string) {
this.server.emit('message', message);
}
}
使用场景: 向客户端广播消息、管理房间、获取连接的客户端列表。
注意: 被
@WebSocketServer()装饰的属性不能手动赋值。
3. @SubscribeMessage(eventName) - 方法装饰器
订阅特定客户端事件。事件名与客户端 socket.emit(eventName, data) 匹配时触发。
import { SubscribeMessage } from '@nestjs/websockets';
@WebSocketGateway()
export class ChatGateway {
@SubscribeMessage('sendMessage')
handleMessage(client: any, payload: any) {
return { event: 'receiveMessage', data: payload };
}
}
使用场景: 处理客户端发来的各类事件——发消息、加入房间、更新状态等。
4. @MessageBody() - 参数装饰器
提取传入消息的载荷数据。支持 Pipe 验证,也可提取嵌套属性。
// 获取完整载荷
@SubscribeMessage('sendMessage')
handleMessage(@MessageBody() data: ChatMessage) {
this.server.emit('receiveMessage', data);
}
// 提取特定字段
@SubscribeMessage('sendMessage')
handleMessage(@MessageBody('content') content: string) {
this.server.emit('receiveMessage', { content });
}
// 配合 Pipe 验证
@SubscribeMessage('createMessage')
handleCreate(@MessageBody(new ValidationPipe()) dto: CreateMessageDto) {
// ...
}
使用场景: 获取客户端发送的消息内容、用户输入数据,配合 Pipe 进行数据验证。
5. @ConnectedSocket() - 参数装饰器
注入当前连接的客户端 Socket 实例,用于向单个客户端发送消息或获取连接信息。
import { ConnectedSocket } from '@nestjs/websockets';
import { Socket } from 'socket.io';
@SubscribeMessage('identify')
handleIdentify(@ConnectedSocket() client: Socket) {
console.log('Client ID:', client.id);
client.emit('identified', { userId: client.id });
}
使用场景: 向特定用户发送私信、获取客户端 ID/IP、操作单个连接。
6. @Ack() - 参数装饰器
提取客户端事件中的 ACK 回调函数。用于客户端发送事件后等待服务端响应的场景。
import { Ack } from '@nestjs/websockets';
@SubscribeMessage('saveMessage')
async handleSave(
@MessageBody() data: ChatMessage,
@Ack() ack: (response: { id: string }) => void,
) {
const saved = await this.messageService.save(data);
ack({ id: saved.id });
}
客户端对应写法:
socket.emit('saveMessage', { content: 'Hello' }, (response) => {
console.log('Server responded with id:', response.id);
});
使用场景: 客户端需要确认服务端已处理消息(消息发送确认、操作结果返回)。
7. 生命周期接口
NestJS 提供了三个生命周期接口,Gateway 实现后可在特定时机执行代码。
| 接口 | 方法 | 触发时机 |
|---|---|---|
OnGatewayInit | afterInit(server) | Gateway 初始化后 |
OnGatewayConnection | handleConnection(client) | 客户端连接时 |
OnGatewayDisconnect | handleDisconnect(client) | 客户端断开时 |
import {
WebSocketGateway,
WebSocketServer,
OnGatewayInit,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
@WebSocketGateway()
export class ChatGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
@WebSocketServer() server: Server;
private logger: Logger = new Logger('ChatGateway');
afterInit(server: Server) {
this.logger.log('Chat Gateway initialized');
}
handleConnection(client: Socket) {
this.logger.log(`Client connected: ${client.id}`);
}
handleDisconnect(client: Socket) {
this.logger.log(`Client disconnected: ${client.id}`);
this.server.emit('userLeft', { userId: client.id });
}
}
使用场景: 连接管理、在线用户统计、资源清理、初始化日志。
8. 跨切面装饰器
以下装饰器来自 @nestjs/common 和 @nestjs/core,可在 Gateway 级别或方法级别使用:
| 装饰器 | 说明 |
|---|---|
@UseGuards() | 应用 Guard(如 JWT 鉴权) |
@UsePipes() | 应用 Pipe(数据验证/转换) |
@UseInterceptors() | 应用 Interceptor(日志/转换) |
@UseFilters() | 应用 Exception Filter |
import { UseGuards, UsePipes, UseFilters, UseInterceptors } from '@nestjs/common';
@WebSocketGateway()
@UseGuards(WsJwtGuard)
export class ChatGateway {
@SubscribeMessage('sendMessage')
@UsePipes(new ValidationPipe())
handleMessage(@MessageBody() dto: SendMessageDto) {
// ...
}
@SubscribeMessage('adminAction')
@UseGuards(AdminGuard)
@UseFilters(new WsExceptionFilter())
handleAdminAction(@ConnectedSocket() client: Socket) {
// ...
}
}
9. 工具类
WsException — WebSocket 专用异常
import { WsException } from '@nestjs/websockets';
@SubscribeMessage('getProfile')
async handleGetProfile(
@MessageBody('userId') userId: string,
) {
const user = await this.userService.find(userId);
if (!user) {
throw new WsException('User not found');
}
return user;
}
WsResponse<T> — 标准响应格式
用于返回自定义事件名和数据的场景。
import { WsResponse } from '@nestjs/websockets';
@SubscribeMessage('events')
handleEvent(): WsResponse<string> {
return { event: 'items', data: 'response data' };
}
BaseWsExceptionFilter — 异常过滤器基类
import { Catch, ArgumentsHost } from '@nestjs/common';
import { BaseWsExceptionFilter, WsException } from '@nestjs/websockets';
@Catch()
export class WsExceptionFilter extends BaseWsExceptionFilter {
catch(exception: unknown, host: ArgumentsHost) {
super.catch(exception, host);
}
}
消息响应方式
Gateway 处理器支持多种响应方式:
| 方式 | 返回值 | 客户端收到 |
|---|---|---|
| 同步返回 | return data | { event: '事件名', data } |
| 返回 WsResponse | return { event: 'xxx', data } | 自定义事件名 |
| 返回 Promise | return Promise.resolve(data) | 异步响应 |
| 返回 Observable | return of(data1, data2) | 多次推送 |
| 手动 emit | client.emit(...) | 完全控制 |
// 1. 同步返回
@SubscribeMessage('sync')
handleSync() {
return 'Hello';
}
// 2. 返回 WsResponse(自定义事件名)
@SubscribeMessage('custom')
handleCustom(): WsResponse<string> {
return { event: 'customEvent', data: 'Hello' };
}
// 3. 返回 Promise(异步)
@SubscribeMessage('async')
async handleAsync(): Promise<string> {
return await this.service.getData();
}
// 4. 返回 Observable(多次推送)
@SubscribeMessage('stream')
handleStream(): Observable<WsResponse<number>> {
return interval(1000).pipe(
map(i => ({ event: 'tick', data: i })),
take(5),
);
}
// 5. 手动 emit(完全控制)
@SubscribeMessage('manual')
handleManual(@ConnectedSocket() client: Socket) {
client.emit('response', { custom: 'data' });
}
实战:搭建实时聊天室
架构设计
客户端 (Socket.IO)
↕
Gateway (鉴权 → 验证 → 处理)
↕
Service (业务逻辑)
↕
广播 / 房间
1. 定义消息 DTO
// src/chat/dto/send-message.dto.ts
import { IsNotEmpty, IsString, MaxLength } from 'class-validator';
export class SendMessageDto {
@IsNotEmpty()
@IsString()
@MaxLength(1000)
content: string;
@IsString()
roomId?: string;
}
2. 创建 WebSocket Guard
HTTP 的 Guard 不能直接用于 WebSocket,需要适配:
// src/auth/ws-jwt.guard.ts
import { CanActivate, ExecutionContext, Injectable } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';
import { JwtService } from '@nestjs/jwt';
@Injectable()
export class WsJwtGuard implements CanActivate {
constructor(private jwtService: JwtService) {}
canActivate(context: ExecutionContext): boolean {
const client = context.switchToWs().getClient<{ handshake: { headers: { authorization?: string } } }>();
const token = client.handshake.headers.authorization?.replace('Bearer ', '');
if (!token) {
throw new WsException('Unauthorized');
}
try {
const user = this.jwtService.verify(token);
client.user = user;
return true;
} catch {
throw new WsException('Invalid token');
}
}
}
3. 创建 Exception Filter
// src/chat/filters/ws-exception.filter.ts
import { Catch, ArgumentsHost } from '@nestjs/common';
import { BaseWsExceptionFilter, WsException } from '@nestjs/websockets';
import { Socket } from 'socket.io';
@Catch(WsException)
export class ChatWsExceptionFilter extends BaseWsExceptionFilter {
catch(exception: WsException, host: ArgumentsHost) {
const client = host.switchToWs().getClient<Socket>();
client.emit('error', { message: exception.message });
}
}
4. 完整 Gateway 实现
// src/chat/chat.gateway.ts
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
ConnectedSocket,
MessageBody,
OnGatewayInit,
OnGatewayConnection,
OnGatewayDisconnect,
WsException,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger, UseGuards, UsePipes, UseFilters, ValidationPipe } from '@nestjs/common';
import { ChatService } from './chat.service';
import { SendMessageDto } from './dto/send-message.dto';
import { WsJwtGuard } from '../auth/ws-jwt.guard';
import { ChatWsExceptionFilter } from './filters/ws-exception.filter';
@WebSocketGateway({
namespace: 'chat',
cors: { origin: '*' },
})
@UseGuards(WsJwtGuard)
@UseFilters(ChatWsExceptionFilter)
export class ChatGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
@WebSocketServer()
server: Server;
private logger: Logger = new Logger('ChatGateway');
private onlineUsers = new Map<string, string>(); // socketId -> userId
afterInit() {
this.logger.log('Chat Gateway initialized');
}
handleConnection(client: Socket) {
const userId = (client as any).user?.sub;
if (userId) {
this.onlineUsers.set(client.id, userId);
this.logger.log(`User ${userId} connected (${client.id})`);
this.server.emit('userJoined', { userId, socketId: client.id });
}
}
handleDisconnect(client: Socket) {
const userId = this.onlineUsers.get(client.id);
if (userId) {
this.onlineUsers.delete(client.id);
this.logger.log(`User ${userId} disconnected (${client.id})`);
this.server.emit('userLeft', { userId });
}
}
@SubscribeMessage('joinRoom')
handleJoinRoom(
@ConnectedSocket() client: Socket,
@MessageBody('roomId') roomId: string,
) {
client.join(roomId);
this.logger.log(`User joined room: ${roomId}`);
return { event: 'roomJoined', data: { roomId } };
}
@SubscribeMessage('leaveRoom')
handleLeaveRoom(
@ConnectedSocket() client: Socket,
@MessageBody('roomId') roomId: string,
) {
client.leave(roomId);
this.logger.log(`User left room: ${roomId}`);
return { event: 'roomLeft', data: { roomId } };
}
@SubscribeMessage('sendMessage')
@UsePipes(new ValidationPipe())
async handleMessage(
@ConnectedSocket() client: Socket,
@MessageBody() dto: SendMessageDto,
) {
const userId = (client as any).user?.sub;
const roomId = dto.roomId || 'global';
const message = {
id: Date.now().toString(),
content: dto.content,
userId,
roomId,
timestamp: new Date().toISOString(),
};
// 保存到数据库
await this.chatService.saveMessage(message);
// 广播到房间
this.server.to(roomId).emit('receiveMessage', message);
return { event: 'messageSent', data: message };
}
@SubscribeMessage('getOnlineUsers')
handleGetOnlineUsers() {
return {
event: 'onlineUsers',
data: Array.from(this.onlineUsers.values()),
};
}
}
5. 注册 Module
// src/chat/chat.module.ts
import { Module } from '@nestjs/common';
import { ChatGateway } from './chat.gateway';
import { ChatService } from './chat.service';
@Module({
providers: [ChatGateway, ChatService],
})
export class ChatModule {}
6. 客户端连接示例
import { io } from 'socket.io-client';
const socket = io('http://localhost:3000/chat', {
extraHeaders: {
Authorization: `Bearer ${token}`,
},
});
// 加入房间
socket.emit('joinRoom', { roomId: 'room-1' });
// 发送消息
socket.emit('sendMessage', { content: 'Hello!', roomId: 'room-1' });
// 接收消息
socket.on('receiveMessage', (message) => {
console.log('New message:', message);
});
// 监听在线用户
socket.on('userJoined', (data) => {
console.log('User joined:', data);
});
// 监听错误
socket.on('error', (err) => {
console.error('WebSocket error:', err);
});
命名空间(Namespace)
当应用有多个实时通信场景时,可以使用命名空间进行隔离:
// 聊天 Gateway
@WebSocketGateway({ namespace: 'chat' })
export class ChatGateway {}
// 通知 Gateway
@WebSocketGateway({ namespace: 'notifications' })
export class NotificationGateway {}
客户端连接时指定命名空间:
const chatSocket = io('http://localhost:3000/chat');
const notificationSocket = io('http://localhost:3000/notifications');
使用场景: 聊天室、通知推送、实时数据看板等业务隔离。
房间(Rooms)管理
Socket.IO 的 Room 是服务端的逻辑分组,客户端可以加入和离开房间。
// 加入房间
@SubscribeMessage('joinRoom')
handleJoinRoom(
@ConnectedSocket() client: Socket,
@MessageBody('roomId') roomId: string,
) {
client.join(roomId);
}
// 广播到房间(不包括发送者)
this.server.to(roomId).emit('receiveMessage', message);
// 广播到房间(包括发送者)
this.server.in(roomId).emit('broadcast', message);
// 发送给特定 socket
this.server.to(socketId).emit('private', message);
// 发送给多个房间
this.server.to(['room1', 'room2']).emit('multiRoom', message);
使用场景: 聊天室消息隔离、频道订阅、分组推送。
动态 Gateway
对于某些场景,可能需要在运行时动态创建 Gateway。NestJS 支持通过 createWebSocketGateway 方法实现:
import { WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';
// 动态配置端口
const port = process.env.WS_PORT || 3001;
@WebSocketGateway(port)
export class DynamicGateway {}
总结
NestJS 的 WebSocket Gateway 提供了一套完整的实时通信解决方案:
- 声明式 API — 通过装饰器定义事件处理,与 Controller 风格一致
- 依赖注入 — 完整支持 DI,可轻松注入 Service
- 生态集成 — Guard、Pipe、Interceptor、Exception Filter 无缝集成
- 平台无关 — 支持 Socket.IO 和 ws 两种适配器
关键 API 回顾:
| API | 用途 |
|---|---|
@WebSocketGateway() | 标记 Gateway 类,配置命名空间和传输选项 |
@WebSocketServer() | 注入 Server 实例用于广播 |
@SubscribeMessage() | 订阅客户端事件 |
@MessageBody() | 提取消息载荷 |
@ConnectedSocket() | 获取客户端 Socket 实例 |
@Ack() | 获取 ACK 回调 |
| 生命周期接口 | afterInit / handleConnection / handleDisconnect |
WsException | 抛出 WebSocket 异常 |